使用DataX同步MySQL数据

本文介绍如何使用 DataX 工具将 MySQL 数据库中的数据同步到表格存储(Tablestore)。

背景信息

DataX 是阿里云的离线数据同步工具,它通过 JDBC 连接 MySQL 数据库,发送 SQL 语句获取数据缓存在本地 JVM中,然后通过 Writer 线程将数据写入到表格存储的数据表中。如果想了解更多关于DataX的介绍,请参见DataX

准备工作

  • 准备需要同步的 MySQL 信息,包括用户名、密码、JDBC 连接信息等。

  • 开通表格存储服务,创建实例和数据表,用于存放同步的数据。具体操作,请参见开通服务并创建实例创建数据表

    重要

    创建数据表时,建议使用MySQL原主键或唯一索引作为表格存储数据表的主键。本文使用的MySQL和表格存储数据表样例,请参见附录-样例数据

  • 获取表格存储的服务地址(Endpoint)和实例名称。

    1. 登录表格存储控制台

    2. 在页面上方,选择资源组和地域。

    3. 概览页面,单击实例别名或在操作列单击实例管理

    4. 实例详情页签,查看实例的名称和服务地址。

  • 获取 AccessKey 信息。请使用阿里云账号或RAM用户的 AccessKey 进行配置。获取AccessKey的具体操作,请参见如何获取AccessKey

    重要

    出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。您可以创建RAM用户、授予该用户管理表格存储权限( AliyunOTSFullAccess )并为该RAM用户创建AccessKey。具体操作,请参见使用RAM用户访问密钥访问表格存储

操作步骤

本文操作使用的服务器为云服务器ECS,操作系统为Alibaba Cloud Linux 3.2104 LTS 64位和Ubuntu 22.04 64位。

一、安装依赖

  1. 安装Python(Python 2Python 3都可以)。

    ECSAlibaba Cloud LinuxUbuntu系统已自带Python 3,如果您使用的是其他服务器,请自行进行安装。

  2. 安装JDK(1.8及以上,推荐1.8)。

    本文介绍在ECSAlibaba Cloud LinuxUbuntu系统中安装JDK 1.8的方法,如果您使用的是其他服务器,请自行进行安装。

    Alibaba Cloud Linux

    yum -y install java-1.8.0-openjdk-devel.x86_64

    Ubuntu

    apt update && apt upgrade
    apt install openjdk-8-jdk

二、安装DataX

  1. 下载DataX工具包。

    wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
  2. 解压工具包。

    tar -zxvf datax.tar.gz

如果有自行编译DataX的需要,请参见DataX安装指引

三、编写配置文件

  1. 进入DataXbin 目录。

    cd datax/bin
  2. 创建配置文件。如果您使用的是 vim,请自行替换命令。

    vi mysql_to_ots.json

    配置文件参考如下,MySQL读取数据有两种模式,请按需选择。

    • querySQL模式:通过SQL语句查询需要导出的数据,支持联表查询。

    • table模式:通过指定表名、列名和where条件确定需要导出的数据,DataX会根据上述信息自动拼接SQL语句并抽取数据。该模式还可以通过数据分片实现并发同步。

    querySQL模式

    请根据您的同步信息和需求替换配置文件内的参数信息。

    {
      "job": {
        "setting": {
          "speed": {
            "channel": 1
          },
          "errorLimit": {
            "record": 0,
            "percentage": 0
          }
        },
        "content": [
          {
            "reader": {
              "name": "mysqlreader",
              "parameter": {
                "username": "mysql_username",
                "password": "mysql_password",
                "connection": [
                  {
                    "querySql": [
                      "select * from table_name"
                    ],
                    "jdbcUrl": [
                      "jdbc:mysql://server_ip:3306/database_name?useSSL=false"
                    ]
                  }
                ]
              }
            },
            "writer": {
              "name": "otswriter",
              "parameter": {
                "endpoint":"endpoint",
                "accessId":"accesskey_id",
                "accessKey":"accesskey_secret",
                "instanceName":"instance_name",
                "table":"table_name",
                "primaryKey":[
                  {"name":"order_id", "type":"string"}
                ],
                "column":[
                  {"name":"user_id","type":"string"},
                  {"name":"sku_id","type":"string"},
                  {"name":"price","type":"double"},
                  {"name":"num","type":"int"},
                  {"name":"total_price","type":"double"},
                  {"name":"order_status","type":"string"},
                  {"name":"create_time","type":"string"},
                  {"name":"modified_time","type":"string"}
                ],
                "writeMode":"UpdateRow"
              }
            }
          }
        ]
      }
    }

    MySQL Reader需要替换的参数说明如下:

    参数名称

    说明

    username

    MySQL 连接用户名。

    password

    MySQL 连接用户密码。

    querySql

    查询SQL,用于确定需要同步的数据范围。

    jdbcUrl

    JDBC连接信息。

    关于MySQL Reader的更多信息,请参见MySQL Reader介绍

    表格存储Writer需要替换的参数说明如下:

    参数名称

    说明

    endpoint

    实例服务地址。

    accessId

    阿里云账号或RAM用户的AccessKey ID。

    accessKey

    阿里云账号或RAM用户的AccessKey Secret。

    instanceName

    实例名称。

    table

    目标表名称。

    primaryKey

    目标表的主键列列表。

    column

    需要写入的属性列。

    关于表格存储Writer的更多信息,请参见表格存储Writer介绍

    table模式

    请根据您的同步信息和需求替换配置文件内的参数信息。

    {
      "job": {
        "setting": {
          "speed": {
            "channel": 1
          },
          "errorLimit": {
            "record": 0,
            "percentage": 0.02
          }
        },
        "content": [
          {
            "reader": {
              "name": "mysqlreader",
              "parameter": {
                "username": "mysql_username",
                "password": "mysql_password",
                "column": [
                  "order_id",
                  "user_id" ,
                  "sku_id" ,
                  "price",
                  "num",
                  "total_price",
                  "order_status",
                  "create_time",
                  "modified_time"
                ],
                "splitPk": "num",
                "connection": [
                  {
                    "table": [
                      "table_name"
                    ],
                    "jdbcUrl": [
                      "jdbc:mysql://server_ip:3306/database_name?useSSL=false"
                    ]
                  }
                ]
              }
            },
            "writer": {
              "name": "otswriter",
              "parameter": {
                "endpoint":"endpoint",
                "accessId":"accesskey_id",
                "accessKey":"accesskey_secret",
                "instanceName":"instance_name",
                "table":"table_name",
                "primaryKey":[
                  {"name":"order_id", "type":"string"}
                ],
                "column":[
                  {"name":"user_id","type":"string"},
                  {"name":"sku_id","type":"string"},
                  {"name":"price","type":"double"},
                  {"name":"num","type":"int"},
                  {"name":"total_price","type":"double"},
                  {"name":"order_status","type":"string"},
                  {"name":"create_time","type":"string"},
                  {"name":"modified_time","type":"string"}
                ],
                "writeMode":"UpdateRow"
              }
            }
          }
        ]
      }
    }

    MySQL Reader需要替换的参数说明如下:

    参数名称

    说明

    username

    MySQL 连接用户名。

    password

    MySQL 连接用户密码。

    column

    需要同步的源表字段。

    splitPk

    数据分片的字段,只在channel数不为1时按指定的字段进行数据分片查询。

    说明

    DataX数据分片支持整型和字符串类型的字段,仅推荐按整型字段进行数据分片。

    table

    MySQL源表表名。

    jdbcUrl

    JDBC连接信息。

    关于MySQL Reader的更多信息,请参见MySQL Reader介绍

    表格存储Writer需要替换的参数说明如下:

    参数名称

    说明

    endpoint

    实例服务地址。

    accessId

    阿里云账号或RAM用户的AccessKey ID。

    accessKey

    阿里云账号或RAM用户的AccessKey Secret。

    instanceName

    实例名称。

    table

    目标表名称。

    primaryKey

    目标表的主键列列表。

    column

    需要写入的属性列。

    关于表格存储Writer的更多信息,请参见表格存储Writer介绍

四、执行同步命令

  1. 执行以下命令开始同步数据。

    python3 datax.py mysql_to_ots.json

    同步任务结束后,将打印整体运行情况。

    2025-02-10 18:02:17.355 [job-0] INFO  JobContainer -
    任务启动时刻                    : 2025-02-10 18:02:06
    任务结束时刻                    : 2025-02-10 18:02:17
    任务总计耗时                    :                 11s
    任务平均流量                    :          871.56KB/s
    记录写入速度                    :          10000rec/s
    读出记录总数                    :              100000
    读写失败总数                    :                   0
  2. 验证同步结果。

    您可以前往表格存储控制台查看已导入的数据。

相关文档

  • 表格存储数据列支持的数据类型,请参见数据类型

附录-样例数据

MySQL源表

MySQL数据样例源表建表语句如下:

create table orders (
  order_id varchar(50) primary key comment '订单ID',
  user_id varchar(10) not null comment '用户ID',
  sku_id varchar(10) not null comment '商品ID',
  price decimal(12, 2) not null comment '商品购买单价',
  num int not null comment '商品购买数量',
  total_price decimal(12, 2) not null comment '订单总价',
  order_status varchar(2) not null comment '订单状态',
  create_time timestamp not null default current_timestamp comment '订单创建时间',
  modified_time timestamp not null default current_timestamp on update current_timestamp comment '最后修改时间'
);

表格存储目标表

由于表格存储是schema-free的,创建目标表时仅需指定主键 order_id 即可。目标表基本详情如下:

image